# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===================================================================
"""A RunConfig subclass with TPU support."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import json
import os
import tensorflow as tf
from tensorflow.core.protobuf import config_pb2
from tensorflow.python.util.tf_export import estimator_export
from tensorflow_estimator.python.estimator import run_config as run_config_lib
from tensorflow_estimator.python.estimator.tpu import util as util_lib

# pylint: disable=protected-access
_TF_CONFIG_ENV = run_config_lib._TF_CONFIG_ENV
_SERVICE_KEY = run_config_lib._SERVICE_KEY
_TPU_WORKER_JOB_NAME = 'tpu_worker_job_name'
# pylint: enable=protected-access


@estimator_export(v1=['estimator.tpu.InputPipelineConfig'])
class InputPipelineConfig(object):
  r"""Please see the definition of these values in TPUConfig."""
  PER_SHARD_V1 = 1
  PER_HOST_V1 = 2
  PER_HOST_V2 = 3
  BROADCAST = 4
  SLICED = 5


@estimator_export(v1=['estimator.tpu.TPUConfig'])
class TPUConfig(
    collections.namedtuple('TPUConfig', [
        'iterations_per_loop',
        'num_shards',
        'num_cores_per_replica',
        'per_host_input_for_training',
        'tpu_job_name',
        'initial_infeed_sleep_secs',
        'input_partition_dims',
        'eval_training_input_configuration',
        'experimental_host_call_every_n_steps',
        'experimental_allow_per_host_v2_parallel_get_next',
        'experimental_feed_hook',
    ])):
  r"""TPU related configuration required by `TPUEstimator`.

  Args:
    iterations_per_loop: This is the number of train steps running in TPU system
      before returning to CPU host for each `Session.run`. This means global
      step is increased `iterations_per_loop` times in one `Session.run`. It is
      recommended to be set as number of global steps for next checkpoint. Note
      that in evaluation don't use this value, instead we run total eval `steps`
      on TPU for a single `Session.run`.
      [Experimental]: `iterations_per_loop` can be specified as a time interval.
        To specify N seconds in one `Session.run`, one can specify it as `Ns`
        and substitute the N with the N with the number of desired seconds.
        Alternatively, the unit of time can also be specified in minutes or
        hours, e.g. `3600s` or `60m` or `1h`.
    num_shards: (Deprecated, ignored by TPUEstimator). The number of model
      replicas in the system. For non-model-parallelism case, this number equals
      the total number of TPU cores. For model-parallelism, the total number of
      TPU cores equals num_cores_per_replica * num_shards.
    num_cores_per_replica: Defaults to `None`, which disables model parallelism.
      An integer which describes the number of TPU cores per model replica. This
      is required by model-parallelism which enables partitioning the model to
      multiple cores. Currently num_cores_per_replica must be 1, 2, 4, or 8.
    per_host_input_for_training: If `True`, for `PER_HOST_V1`, the `input_fn` is
      invoked once on each host, and the number of hosts must be smaller or
      equal to the number of replicas. For PER_HOST_V2, the `input_fn` is
      invoked once for each host (if the number of hosts is less than the number
      of replicas) or replica (if the number of replicas is less than the number
      of hosts. With the per-core input pipeline configuration, it is invoked
      once for each core. With a global batch size `train_batch_size` in
      `TPUEstimator` constructor, the batch size for each shard is
      `train_batch_size` // #hosts in the `True` or `PER_HOST_V1` mode. In
      `PER_HOST_V2` mode, it is `train_batch_size` // #cores. In `BROADCAST`
      mode, `input_fn` is only invoked once on host 0 and the tensors are
      broadcasted to all other replicas. The batch size equals to
      `train_batch_size`. With the per-core input pipeline configuration, the
      shard batch size is also `train_batch_size` // #cores.
      Note: per_host_input_for_training==PER_SHARD_V1 only supports mode.TRAIN.
    tpu_job_name: The name of the TPU job. Typically, this name is auto-inferred
      within TPUEstimator, however when using ClusterSpec propagation in more
      esoteric cluster configurations, you may need to specify the job name as a
      string.
    initial_infeed_sleep_secs: The number of seconds the infeed thread should
      wait before enqueueing the first batch. This helps avoid timeouts for
      models that require a long compilation time.
    input_partition_dims: A nested list to describe the partition dims for all
      the tensors from input_fn(). The structure of input_partition_dims must
      match the structure of `features` and `labels` from input_fn(). The total
      number of partitions must match
      `num_cores_per_replica`. For example, if input_fn() returns two tensors:
        images with shape [N, H, W, C] and labels [N]. input_partition_dims =
        [[1, 2, 2, 1], None] will split the images to 4 pieces and feed into 4
        TPU cores. labels tensor are directly broadcasted to all the TPU cores
        since the partition dims is `None`.
      Current limitations: This feature is only supported with the PER_HOST_V2
        input mode.
    eval_training_input_configuration: If `SLICED`, `input_fn` is only invoked
      once on host 0 and the tensors are broadcasted to all other replicas.
      Unlike per_host_input_for_training=BROADCAST, each replica will only get a
      slice of the data instead of a whole copy. If `PER_HOST_V1`, the behaviour
      is determined by per_host_input_for_training.
    experimental_host_call_every_n_steps: Within a training loop, this argument
      sets how often host calls are performed during training. Host calls will
      be evaluated every n steps within a training loop where n is the value of
      this argument.
    experimental_allow_per_host_v2_parallel_get_next: When enabled, allows
      concurrent execution of dataset get next calls when using PER_HOST_V2
      input. May result in a performance increase for models with a small step
      time, but as a consequence TPUEstimator may non-deterministically
      distribute batches to different cores, rather than guaranteeing round
      robin behavior.
    experimental_feed_hook: This is a class which user can provide to the TPU
      estimator to override the default TPUInfeedOutfeedSessionHook implementation
      and add customized implementatioin to handle infeed outfeed logic. If
      given class is None, TPU estimator uses default TPUInfeedOutfeedSessionHook
      implementation in tpu_estimator.py. If not None, TPU estimator uses this
      customized tpu infeed outfeed session hook class rather to override the
      default one.

  Raises:
      ValueError: If `num_cores_per_replica` is not 1, 2, 4, 8, ..., 128.
  """

  def __new__(cls,
              iterations_per_loop=2,
              num_shards=None,
              num_cores_per_replica=None,
              per_host_input_for_training=True,
              tpu_job_name=None,
              initial_infeed_sleep_secs=None,
              input_partition_dims=None,
              eval_training_input_configuration=InputPipelineConfig.PER_HOST_V1,
              experimental_host_call_every_n_steps=1,
              experimental_allow_per_host_v2_parallel_get_next=False,
              experimental_feed_hook=None):

    # Check iterations_per_loop.
    util_lib.parse_iterations_per_loop(iterations_per_loop)

    # Check num_shards.
    if num_shards is not None:
      util_lib.check_positive_integer(num_shards, 'TPUConfig num_shards')

    if input_partition_dims is not None:
      if len(input_partition_dims) != 1 and len(input_partition_dims) != 2:
        raise ValueError(
            'input_partition_dims must be a list/tuple with one or two'
            ' elements.')

      if per_host_input_for_training is not InputPipelineConfig.PER_HOST_V2:
        raise ValueError(
            'input_partition_dims is only supported in PER_HOST_V2 mode.')

      if num_cores_per_replica is None:
        raise ValueError(
            'input_partition_dims requires setting num_cores_per_replica.')

    # Check num_cores_per_replica
    if num_cores_per_replica is not None:
      if num_cores_per_replica not in ([1, 2, 4, 8, 16, 32, 64, 128]):
        raise ValueError(
            'num_cores_per_replica must be 1, 2, 4, 8, 16, 32, 64, 128; '
            'got {}'.format(str(num_cores_per_replica)))

    if eval_training_input_configuration not in [
        InputPipelineConfig.PER_HOST_V1, InputPipelineConfig.SLICED
    ]:
      raise ValueError(
          'eval_training_input_configuration must be PER_HOST_V1 or SLICED;'
          ' got {}'.format(str(eval_training_input_configuration)))

    # per_host_input_for_training may be True, False, or integer in [1..3].
    # Map legacy values (True, False) to numeric values.
    if per_host_input_for_training is False:
      per_host_input_for_training = InputPipelineConfig.PER_SHARD_V1
    elif per_host_input_for_training is True:
      per_host_input_for_training = InputPipelineConfig.PER_HOST_V1

    # Check initial_infeed_sleep_secs.
    if initial_infeed_sleep_secs:
      util_lib.check_positive_integer(initial_infeed_sleep_secs,
                                      'TPUConfig initial_infeed_sleep_secs')

    tpu_job_name = tpu_job_name or _get_tpu_job_name_from_tf_config()

    return super(TPUConfig, cls).__new__(
        cls,
        iterations_per_loop=iterations_per_loop,
        num_shards=num_shards,
        num_cores_per_replica=num_cores_per_replica,
        per_host_input_for_training=per_host_input_for_training,
        tpu_job_name=tpu_job_name,
        initial_infeed_sleep_secs=initial_infeed_sleep_secs,
        input_partition_dims=input_partition_dims,
        eval_training_input_configuration=eval_training_input_configuration,
        experimental_host_call_every_n_steps=(
            experimental_host_call_every_n_steps),
        experimental_allow_per_host_v2_parallel_get_next=(
            experimental_allow_per_host_v2_parallel_get_next),
        experimental_feed_hook=(experimental_feed_hook))


@estimator_export(v1=['estimator.tpu.RunConfig'])
class RunConfig(run_config_lib.RunConfig):
  """RunConfig with TPU support."""

  def __init__(self,
               tpu_config=None,
               evaluation_master=None,
               master=None,
               cluster=None,
               **kwargs):
    """Constructs a RunConfig.

    Args:
      tpu_config: the TPUConfig that specifies TPU-specific configuration.
      evaluation_master: a string. The address of the master to use for eval.
        Defaults to master if not set.
      master: a string. The address of the master to use for training.
      cluster: a ClusterResolver
      **kwargs: keyword config parameters.

    Raises:
      ValueError: if cluster is not None and the provided session_config has a
        cluster_def already.
    """
    super(RunConfig, self).__init__(**kwargs)
    self._tpu_config = tpu_config or TPUConfig()
    self._cluster = cluster

    # If user sets master and/or evaluation_master explicitly, including empty
    # string '', take it. Otherwise, take the values set by parent class.
    if master is not None:
      if cluster is not None:
        raise ValueError('Both master and cluster are set.')
      self._master = master
    else:
      if cluster:
        self._master = cluster.master()

    if evaluation_master is not None:
      self._evaluation_master = evaluation_master
    elif (not self._evaluation_master and
          self.task_type != run_config_lib.TaskType.EVALUATOR):
      # If the task type is EVALUATOR, it means some cluster manager sets the
      # TF_CONFIG. In that case, we respect the configuration in TF_CONFIG.
      #
      # Otherwise, it means user executes the code without external cluster
      # manager. For that, we optimize the user experience by setting
      # evaluation_master to master, unless user overwrites it.
      self._evaluation_master = self._master

    # Set the ClusterSpec to use
    if cluster:
      self._cluster_spec = cluster.cluster_spec()

      # Merge the cluster_def into the ConfigProto.
      if self._session_config is None:  # pylint: disable=access-member-before-definition
        self._session_config = config_pb2.ConfigProto(
            allow_soft_placement=True, isolate_session_state=True)
      if self._session_config.HasField('cluster_def'):
        raise ValueError('You cannot provide a ClusterResolver and '
                         'session_config.cluster_def.')
      if self._cluster_spec:
        self._session_config.cluster_def.CopyFrom(
            self._cluster_spec.as_cluster_def())

  def _maybe_overwrite_session_config_for_distributed_training(self):
    # Overrides the parent class session_config overwrite for between-graph. TPU
    # runs with in-graph, which should not have device filter. Doing nothing
    # ("pass") basically disables it.
    pass

  @property
  def evaluation_master(self):
    return self._evaluation_master

  @property
  def master(self):
    return self._master

  @property
  def tpu_config(self):
    return self._tpu_config

  @property
  def cluster(self):
    return self._cluster

  def replace(self, **kwargs):
    if 'tpu_config' not in kwargs:
      return super(RunConfig, self).replace(**kwargs)

    tpu_config = kwargs.pop('tpu_config')
    new_instance = super(RunConfig, self).replace(**kwargs)
    new_instance._tpu_config = tpu_config  # pylint: disable=protected-access
    return new_instance


def _get_tpu_job_name_from_tf_config():
  """Extracts the TPU job name from TF_CONFIG env variable."""
  # TODO(xiejw): Extends this to support both TF_CONFIG env variable and cluster
  # spec propagation.
  tf_config = json.loads(os.environ.get(_TF_CONFIG_ENV, '{}'))
  tpu_job_name = tf_config.get(_SERVICE_KEY, {}).get(_TPU_WORKER_JOB_NAME)
  if tpu_job_name:
    tf.compat.v1.logging.info('Load TPU job name from TF_CONFIG: %s',
                              tpu_job_name)
  return tpu_job_name
